home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2009 February / PCWFEB09.iso / Software / Resources / Chat & Communication / Digsby build 37 / digsby_setup.exe / lib / jabber / threadstream.pyo (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2008-10-13  |  18.6 KB  |  652 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.5)
  3.  
  4. from __future__ import with_statement
  5. import pyxmpp.error as pyxmpp
  6. from dns import resolver
  7. from pyxmpp.exceptions import ClientStreamError
  8. from util.threads.threadpool2 import threaded
  9. from util.callbacks import callsback
  10. from util import GetSocketType, Delegate
  11. from pyxmpp.exceptions import LegacyAuthenticationError
  12. from pyxmpp.streambase import STREAM_NS
  13. from util.diagnostic import Diagnostic
  14. import libxml2
  15. import time
  16. import Queue
  17. import logging
  18. import socket
  19. from jabber.threadstreamsocket import ThreadStreamSocket
  20. from pyxmpp.jabber.clientstream import LegacyClientStream
  21. from pyxmpp.exceptions import StreamError, StreamEncryptionRequired, HostMismatch, ProtocolError, TLSError
  22. from pyxmpp.exceptions import FatalStreamError, StreamParseError, StreamAuthenticationError, SASLAuthenticationFailed
  23. from pyxmpp.jid import JID
  24. from pyxmpp import resolver
  25. from common import netcall
  26. from threading import currentThread
  27. import traceback
  28. import sys
  29. log = logging.getLogger('ThreadStream')
  30. outdebug = logging.getLogger('ThreadStream.out').debug
  31. outdebug_s = getattr(logging.getLogger('ThreadStream.out'), 'debug_s', outdebug)
  32. indebug = logging.getLogger('ThreadStream.in').debug
  33. indebug_s = getattr(logging.getLogger('ThreadStream.in'), 'debug_s', outdebug)
  34.  
  35. try:
  36.     import M2Crypto
  37.     if M2Crypto.version_info < (0, 16):
  38.         tls_available = 0
  39.     else:
  40.         from M2Crypto import SSL
  41.         from M2Crypto.SSL import SSLError
  42.         import M2Crypto.SSL.cb as M2Crypto
  43.         tls_available = 1
  44. except ImportError:
  45.     tls_available = 0
  46.  
  47.  
  48. class ThreadStream(LegacyClientStream):
  49.     
  50.     def __init__(self, *a, **k):
  51.         LegacyClientStream.__init__(self, *a, **k)
  52.         self._ThreadStream__logger = logging.getLogger('ThreadStream')
  53.         self._ThreadStream__shutdown = False
  54.         self.on_incoming_node = Delegate()
  55.         self.on_outgoing_node = Delegate()
  56.  
  57.     
  58.     def stanza(self, _unused, node):
  59.         self.on_incoming_node(node)
  60.         LegacyClientStream.stanza(self, _unused, node)
  61.  
  62.     
  63.     def _write_node(self, xmlnode):
  64.         self.on_outgoing_node(xmlnode)
  65.         LegacyClientStream._write_node(self, xmlnode)
  66.  
  67.     
  68.     def write_raw(self, data):
  69.         (None, netcall)((lambda : LegacyClientStream.write_raw(self, data)))
  70.  
  71.     
  72.     def idle(self):
  73.         (netcall,)((lambda : LegacyClientStream.idle(self)))
  74.  
  75.     
  76.     def send(self, stanza):
  77.         (None, netcall)((lambda : LegacyClientStream.send(self, stanza)))
  78.  
  79.     
  80.     def _write_raw(self, data):
  81.         if sys.DEV and currentThread().getName() != 'AsyncoreThread':
  82.             
  83.             try:
  84.                 raise AssertionError, 'bad thread for _write_raw: %r' % currentThread().getName()
  85.             except AssertionError:
  86.                 traceback.print_exc()
  87.                 traceback.print_stack()
  88.                 import wx
  89.                 
  90.                 def do_submit():
  91.                     d = Diagnostic(description = 'Automated: Woah, bad thread')
  92.                     d.prepare_data()
  93.                     d.do_post()
  94.                     profile = profile
  95.                     import common
  96.                     uname = profile.username
  97.                     del profile
  98.                     wx.MessageBox('Hey %s! Something crazy just happened!\nI submitted a bug report for you. - Chris' % uname)
  99.  
  100.                 wx.CallLater(3000, do_submit)
  101.                 raise 
  102.             except:
  103.                 None<EXCEPTION MATCH>AssertionError
  104.             
  105.  
  106.         None<EXCEPTION MATCH>AssertionError
  107.         outdebug_s('OUT: %r', data)
  108.         
  109.         try:
  110.             self.socket.push(data)
  111.         except Exception:
  112.             e = None
  113.             self.handle_error(e)
  114.  
  115.         outdebug('OUT: done')
  116.  
  117.     
  118.     def fileno(self):
  119.         self.lock.__enter__()
  120.         
  121.         try:
  122.             return self.socket._fileno
  123.         finally:
  124.             pass
  125.  
  126.  
  127.     
  128.     def connect(self, server = None, port = None):
  129.         outdebug('connect')
  130.         self.lock.__enter__()
  131.         
  132.         try:
  133.             self._connect1(server, port)
  134.         finally:
  135.             pass
  136.  
  137.  
  138.     
  139.     def _connect1(self, server = None, port = None):
  140.         outdebug('_connect1')
  141.         if not (self.my_jid.node) or not (self.my_jid.resource):
  142.             raise ClientStreamError, 'Client JID must have username and resource'
  143.         
  144.         if not server:
  145.             server = self.server
  146.         
  147.         if not port:
  148.             port = self.port
  149.         
  150.         if server:
  151.             self._ThreadStream__logger.debug('server: %r', (server, port))
  152.             service = None
  153.         else:
  154.             service = 'xmpp-client'
  155.         if port is None:
  156.             port = 5222
  157.         
  158.         if server is None:
  159.             self._ThreadStream__logger.debug('server: %r', (server, port))
  160.             server = self.my_jid.domain
  161.         
  162.         self.me = self.my_jid
  163.         
  164.         def connect_failed():
  165.             self.owner.set_offline(self.owner.Reasons.CONN_FAIL)
  166.  
  167.         self._connect2(server, port, service, self.my_jid.domain, sck_cls = GetSocketType())
  168.  
  169.     
  170.     def _connect2(self, addr1, port1, service = None, to = None, sck_cls = socket.SocketType):
  171.         outdebug('_connect2')
  172.         self._ThreadStream__logger.debug('server: %r', (addr1, port1))
  173.         if to is None:
  174.             to = str(addr1)
  175.         
  176.         if service is not None:
  177.             self.state_change('resolving srv', (addr1, service))
  178.             
  179.             try:
  180.                 addrs = resolver.resolve_srv(addr1, service)
  181.             except Exception:
  182.                 traceback.print_exc()
  183.                 addrs = []
  184.  
  185.             if not addrs:
  186.                 addrs = [
  187.                     (addr1, port1)]
  188.             else:
  189.                 addrs.append((addr1, port1))
  190.         else:
  191.             addrs = [
  192.                 (addr1, port1)]
  193.         msg = None
  194.         self._ThreadStream__logger.debug('addrs: %r', addrs)
  195.         for addr, port in addrs:
  196.             if type(addr) in (str, unicode):
  197.                 self.state_change('resolving', addr)
  198.             
  199.             s = None
  200.             
  201.             try:
  202.                 resolved = resolver.getaddrinfo(addr, port, 0, socket.SOCK_STREAM)
  203.             except Exception:
  204.                 traceback.print_exc()
  205.                 resolved = []
  206.  
  207.             resolved.append((2, 1, 0, '_unused', (addr, port)))
  208.             for res in resolved:
  209.                 (family, socktype, proto, _unused, sockaddr) = res
  210.                 self._ThreadStream__logger.debug('res: %r', res)
  211.                 
  212.                 try:
  213.                     s = sck_cls(family, socktype, proto)
  214.                     s.settimeout(10)
  215.                     self.state_change('connecting', sockaddr)
  216.                     s.connect(sockaddr)
  217.                     if self.owner.do_ssl:
  218.                         ctx = SSL.Context()
  219.                         ctx.set_verify(SSL.verify_none, 10)
  220.                         s.setblocking(True)
  221.                         ssl = SSL.Connection(ctx, s)
  222.                         ssl.setup_ssl()
  223.                         ssl.set_connect_state()
  224.                         ssl.connect_ssl()
  225.                         s.setblocking(False)
  226.                         s = ssl
  227.                         s.setblocking(False)
  228.                     
  229.                     self.state_change('connected', sockaddr)
  230.                 except (socket.error, SSLError):
  231.                     msg = None
  232.                     self._ThreadStream__logger.debug('Connect to %r failed: %r', sockaddr, msg)
  233.                     traceback.print_exc()
  234.                     if s:
  235.                         s.close()
  236.                         s = None
  237.                         continue
  238.                     continue
  239.  
  240.             
  241.             if s:
  242.                 self._ThreadStream__logger.debug('connected to: %r', (addr, port))
  243.                 break
  244.                 continue
  245.         
  246.         if not s:
  247.             if msg:
  248.                 self._ThreadStream__logger.debug('failed to connect to %r: %r', (addr, port), msg)
  249.                 raise socket.error, msg
  250.             else:
  251.                 self._ThreadStream__logger.debug('failed to connect to %r: unknown reason', (addr, port))
  252.                 raise FatalStreamError, 'Cannot connect'
  253.         
  254.         self.addr = addr
  255.         self.port = port
  256.         self.owner.lock.__enter__()
  257.         
  258.         try:
  259.             if self.owner.connect_killed == True:
  260.                 raise FatalStreamError, 'Connect Killed'
  261.         finally:
  262.             pass
  263.  
  264.         self._connect_socket(s, to)
  265.         self.last_keepalive = time.time()
  266.  
  267.     
  268.     def closed(self):
  269.         self.owner.fatal_error()
  270.         self.state_change('disconnected', self.peer)
  271.  
  272.     
  273.     def closed_dead(self):
  274.         self.owner.fatal_error()
  275.         self.close(False)
  276.         self.owner.disconnected()
  277.  
  278.     
  279.     def __connect_error(self):
  280.         pass
  281.  
  282.     
  283.     def _connect_socket(self, sock, to = None):
  284.         logging.getLogger('ThreadStream').debug('connecting')
  285.         new_sock = ThreadStreamSocket(sock, self._feed_reader, 100, self.closed, self.closed_dead, ssl = self.owner.do_ssl)
  286.         (None, None, netcall)((lambda : LegacyClientStream._connect_socket(self, new_sock, to)))
  287.  
  288.     
  289.     def _loop_iter(self, timeout):
  290.         pass
  291.  
  292.     
  293.     def _process(self):
  294.         pass
  295.  
  296.     
  297.     def _read(self):
  298.         pass
  299.  
  300.     
  301.     def _process_tls_node(self, xmlnode):
  302.         if not (self.tls_settings) or not tls_available:
  303.             self._ThreadStream__logger.debug('Unexpected TLS node: %r' % xmlnode.serialize())
  304.             return False
  305.         
  306.         if self.initiator:
  307.             if xmlnode.name == 'failure':
  308.                 raise TLSNegotiationFailed, 'Peer failed to initialize TLS connection'
  309.             elif xmlnode.name != 'proceed' or not (self.tls_requested):
  310.                 self._ThreadStream__logger.debug('Unexpected TLS node: %r' % xmlnode.serialize())
  311.                 return False
  312.             
  313.             self.tls_requested = 0
  314.             self._make_tls_connection(success = self.finish_process, error = self.fail_process)
  315.         
  316.         return True
  317.  
  318.     
  319.     def fail_process(self):
  320.         self.owner.fatal_error()
  321.         self.close()
  322.  
  323.     
  324.     def finish_process(self):
  325.         self.socket = self.tls
  326.         self._ThreadStream__logger.debug('Restarting XMPP stream')
  327.         self._restart_stream()
  328.         return True
  329.  
  330.     
  331.     def _make_tls_connection(self, callback = None):
  332.         ctx = None
  333.         
  334.         try:
  335.             if not tls_available or not (self.tls_settings):
  336.                 raise TLSError, 'TLS is not available'
  337.             
  338.             tlssettings = self.tls_settings
  339.             self.state_change('tls connecting', self.peer)
  340.             self._ThreadStream__logger.debug('Creating TLS context')
  341.             ctx = None if tlssettings.ctx else SSL.Context('tlsv1')
  342.             verify_callback = tlssettings.verify_callback
  343.             if not verify_callback:
  344.                 verify_callback = self.tls_default_verify_callback
  345.             
  346.             if tlssettings.verify_peer:
  347.                 self._ThreadStream__logger.debug('verify_peer, verify_callback: %r', verify_callback)
  348.                 ctx.set_verify(SSL.verify_peer, 10, verify_callback)
  349.             else:
  350.                 ctx.set_verify(SSL.verify_none, 10)
  351.             if tlssettings.cert_file:
  352.                 ctx.use_certificate_chain_file(tlssettings.cert_file)
  353.                 if tlssettings.key_file:
  354.                     ctx.use_PrivateKey_file(tlssettings.key_file)
  355.                 else:
  356.                     ctx.use_PrivateKey_file(tlssettings.cert_file)
  357.                 ctx.check_private_key()
  358.             
  359.             if tlssettings.cacert_file:
  360.                 
  361.                 try:
  362.                     ctx.load_verify_location(tlssettings.cacert_file)
  363.                 except AttributeError:
  364.                     ctx.load_verify_locations(tlssettings.cacert_file)
  365.                 except:
  366.                     None<EXCEPTION MATCH>AttributeError
  367.                 
  368.  
  369.             None<EXCEPTION MATCH>AttributeError
  370.         except:
  371.             callback.error()
  372.             return None
  373.  
  374.         self.callback = callback
  375.         self.socket.make_tls(ctx, success = self.tls_done, error = self.tls_fail)
  376.  
  377.     _make_tls_connection = callsback(_make_tls_connection)
  378.     tls_fail = fail_process
  379.     
  380.     def tls_done(self):
  381.         self.tls = self.socket
  382.         self.state_change('tls connected', self.peer)
  383.         
  384.         try:
  385.             raise Exception
  386.         except:
  387.             pass
  388.  
  389.         self.callback.success()
  390.  
  391.     
  392.     def _got_features(self):
  393.         
  394.         try:
  395.             return LegacyClientStream._got_features(self)
  396.         except FatalStreamError:
  397.             e = None
  398.             if e.__class__ == FatalStreamError:
  399.                 self.owner.auth_failed(e.message)
  400.             else:
  401.                 raise 
  402.         except:
  403.             e.__class__ == FatalStreamError
  404.  
  405.  
  406.     
  407.     def registration_error(self, stanza):
  408.         self.lock.__enter__()
  409.         
  410.         try:
  411.             ae = None
  412.             err = stanza.get_error()
  413.             ae = err.xpath_eval('e:*', {
  414.                 'e': 'jabber:iq:auth:error' })
  415.             if ae:
  416.                 ae = ae[0].name
  417.             else:
  418.                 ae = err.get_condition().name
  419.         finally:
  420.             pass
  421.  
  422.         if self.registration_error_callback is not None:
  423.             self.registration_error_callback((ae,) + pyxmpp.error.stanza_errors[ae])
  424.         
  425.         self.registration_error_callback = None
  426.         self.registration_success_callback = None
  427.  
  428.     
  429.     def registration_success(self, stanza):
  430.         if self.registration_success_callback is not None:
  431.             self.registration_success_callback()
  432.         
  433.         self.registration_success_callback = None
  434.         self.registration_error_callback = None
  435.         _unused = stanza
  436.         self.lock.__enter__()
  437.         
  438.         try:
  439.             self.state_change('registered', self.registration_form)
  440.             if 'FORM_TYPE' in self.registration_form and self.registration_form['FORM_TYPE'].value == 'jabber:iq:register':
  441.                 if 'username' in self.registration_form:
  442.                     self.my_jid = JID(self.registration_form['username'].value, self.my_jid.domain, self.my_jid.resource)
  443.                 
  444.                 if 'password' in self.registration_form:
  445.                     self.password = self.registration_form['password']
  446.                 
  447.             
  448.             self.registration_callback = None
  449.         finally:
  450.             pass
  451.  
  452.  
  453.     
  454.     def disconnect(self):
  455.         LegacyClientStream.disconnect(self)
  456.         self.state_change('disconnected', self.peer)
  457.  
  458.     
  459.     def stream_end(self, _unused):
  460.         LegacyClientStream.stream_end(self, _unused)
  461.         self.shutdown()
  462.  
  463.     
  464.     def _send_stream_end(self):
  465.         LegacyClientStream._send_stream_end(self)
  466.         self.shutdown()
  467.  
  468.     
  469.     def shutdown(self):
  470.         if not self._ThreadStream__shutdown:
  471.             outdebug('non-Force shutdown')
  472.             self._ThreadStream__shutdown = True
  473.             if self.socket:
  474.                 outdebug('non-Force close_when_done')
  475.                 self.socket.close_when_done()
  476.             
  477.         else:
  478.             outdebug('Force shutdown')
  479.             self.close(False)
  480.  
  481.     
  482.     def close(self, do_disconnect = True):
  483.         self.lock.__enter__()
  484.         
  485.         try:
  486.             return self._close(do_disconnect)
  487.         finally:
  488.             pass
  489.  
  490.  
  491.     
  492.     def _close(self, do_disconnect = True):
  493.         if do_disconnect:
  494.             self._disconnect()
  495.         
  496.         if self.doc_in:
  497.             self.doc_in = None
  498.         
  499.         if self.features:
  500.             self.features = None
  501.         
  502.         self._reader = None
  503.         self.stream_id = None
  504.         if self.socket:
  505.             self.socket.close()
  506.         
  507.         self._reset()
  508.  
  509.     
  510.     def _process_node(self, stanza):
  511.         
  512.         try:
  513.             LegacyClientStream._process_node(self, stanza)
  514.         except SASLAuthenticationFailed:
  515.             e = None
  516.             self.owner.auth_failed(reason = e.message)
  517.             self._ThreadStream__logger.critical('SASLAuthenticationFailed')
  518.         except LegacyAuthenticationError:
  519.             e = None
  520.             self.owner.auth_failed(reason = e.message)
  521.             self._ThreadStream__logger.critical('LegacyAuthenticationError')
  522.         except FatalStreamError:
  523.             e = None
  524.             import hub
  525.             hub.get_instance().on_error(e)
  526.             self._ThreadStream__logger.critical('Stream blew up')
  527.             self.owner.fatal_error()
  528.             self.close()
  529.  
  530.  
  531.     
  532.     def error(self, descr):
  533.         self._ThreadStream__logger.critical('XML parse error: ' + descr)
  534.         self.owner.fatal_error()
  535.         self.close()
  536.  
  537.     
  538.     def fix_in_stanza(self, stanza):
  539.         LegacyClientStream.fix_in_stanza(self, stanza)
  540.         if self.initiator:
  541.             to = stanza.get_to()
  542.             if to is not None:
  543.                 p = self.peer
  544.                 pb = None if p else None
  545.                 tob = None if to else None
  546.                 if tob == pb and to == p and to == pb or tob == p:
  547.                     stanza.set_to(False)
  548.                 
  549.             
  550.         
  551.  
  552.     
  553.     def _feed_reader(self, data):
  554.         self.lock.__enter__()
  555.         
  556.         try:
  557.             if self._reader is not None:
  558.                 self._super_feed_reader(data)
  559.             else:
  560.                 self.close(False)
  561.         finally:
  562.             pass
  563.  
  564.  
  565.     
  566.     def _super_feed_reader(self, data):
  567.         indebug_s('IN: %r', data)
  568.         if data:
  569.             
  570.             try:
  571.                 r = self._reader.feed(data)
  572.                 while r:
  573.                     r = self._reader.feed('')
  574.                 if r is None:
  575.                     indebug('r was None, setting eof + disconnect')
  576.                     self.eof = 1
  577.                     self.disconnect()
  578.             except StreamParseError:
  579.                 self._send_stream_error('xml-not-well-formed')
  580.                 raise 
  581.             except:
  582.                 None<EXCEPTION MATCH>StreamParseError
  583.             
  584.  
  585.         None<EXCEPTION MATCH>StreamParseError
  586.         indebug('no data, setting eof + disconnect')
  587.         self.eof = 1
  588.         self.disconnect()
  589.         if self.eof:
  590.             indebug('eof calling stream_end')
  591.             self.stream_end(None)
  592.         
  593.  
  594.     
  595.     def stream_start(self, doc):
  596.         self.doc_in = doc
  597.         log.debug('input document: %r' % (self.doc_in.serialize(),))
  598.         
  599.         try:
  600.             r = self.doc_in.getRootElement()
  601.             if r.ns().getContent() != STREAM_NS:
  602.                 self._send_stream_error('invalid-namespace')
  603.                 raise FatalStreamError, 'Invalid namespace.'
  604.         except libxml2.treeError:
  605.             self._send_stream_error('invalid-namespace')
  606.             raise FatalStreamError, "Couldn't get the namespace."
  607.  
  608.         self.version = r.prop('version')
  609.         if self.version and self.version != '1.0':
  610.             self._send_stream_error('unsupported-version')
  611.             raise FatalStreamError, 'Unsupported protocol version.'
  612.         
  613.         to_from_mismatch = 0
  614.         if self.initiator:
  615.             self.stream_id = r.prop('id')
  616.             peer = r.prop('from')
  617.             if peer:
  618.                 peer = JID(peer)
  619.             
  620.             if self.peer:
  621.                 if peer and peer != self.peer and not unicode(self.peer).endswith(unicode(peer)):
  622.                     self._ThreadStream__logger.debug('peer hostname mismatch: %r != %r' % (peer, self.peer))
  623.                     to_from_mismatch = 1
  624.                 elif peer:
  625.                     self.peer = peer
  626.                 
  627.             else:
  628.                 self.peer = peer
  629.         else:
  630.             to = r.prop('to')
  631.             if to:
  632.                 to = self.check_to(to)
  633.                 if not to:
  634.                     self._send_stream_error('host-unknown')
  635.                     raise FatalStreamError, 'Bad "to"'
  636.                 
  637.                 self.me = JID(to)
  638.             
  639.             self._send_stream_start(self.generate_id())
  640.             self._send_stream_features()
  641.             self.state_change('fully connected', self.peer)
  642.             self._post_connect()
  643.         if not self.version:
  644.             self.state_change('fully connected', self.peer)
  645.             self._post_connect()
  646.         
  647.         if to_from_mismatch:
  648.             raise HostMismatch
  649.         
  650.  
  651.  
  652.